{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Copyright (c) Microsoft Corporation. All rights reserved.  \n",
        "Licensed under the MIT License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-with-data-dependency-steps.png)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Showcasing Dataset and PipelineParameter\n",
        "\n",
        "This notebook demonstrates how a [**FileDataset**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.filedataset?view=azure-ml-py) or [**TabularDataset**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.tabulardataset?view=azure-ml-py) can be parametrized with [**PipelineParameters**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py) in an AML [Pipeline](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline(class)?view=azure-ml-py). By parametrizing datasets, you can dynamically run pipeline experiments with different datasets without any code change.\n",
        "\n",
        "A common use case is building a training pipeline with a sample of your training data for quick iterative development. When you're ready to test and deploy your pipeline at scale, you can pass in your full training dataset to the pipeline experiment without making any changes to your training script. \n",
        " \n",
        "To see more about how parameters work between steps, please refer [aml-pipelines-with-data-dependency-steps](https://aka.ms/pl-data-dep).\n",
        "\n",
        "* [How to create a Pipeline with a Dataset PipelineParameter](#index1)\n",
        "* [How to submit a Pipeline with a Dataset PipelineParameter](#index2)\n",
        "* [How to submit a Pipeline and change the Dataset PipelineParameter value from the sdk](#index3)\n",
        "* [How to submit a Pipeline and change the Dataset PipelineParameter value using a REST call](#index4)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Azure Machine Learning and Pipeline SDK-specific imports"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import azureml.core\n",
        "from azureml.core import Workspace, Experiment, Dataset, RunConfiguration\n",
        "from azureml.core.compute import ComputeTarget, AmlCompute\n",
        "from azureml.core.environment import CondaDependencies\n",
        "from azureml.data.dataset_consumption_config import DatasetConsumptionConfig\n",
        "from azureml.widgets import RunDetails\n",
        "\n",
        "from azureml.pipeline.core import PipelineParameter\n",
        "from azureml.pipeline.core import Pipeline, PipelineRun\n",
        "from azureml.pipeline.steps import PythonScriptStep\n",
        "\n",
        "# Check core SDK version number\n",
        "print(\"SDK version:\", azureml.core.VERSION)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Initialize Workspace\n",
        "\n",
        "Initialize a workspace object from persisted configuration. If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure the config file is present at .\\config.json\n",
        "\n",
        "If you don't have a config.json file, go through the [configuration Notebook](https://aka.ms/pl-config) first.\n",
        "\n",
        "This sets you up with a working config file that has information on your workspace, subscription id, etc."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "ws = Workspace.from_config()\n",
        "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Create an Azure ML experiment\n",
        "\n",
        "Let's create an experiment named \"showcasing-dataset\" and a folder to hold the training scripts. The script runs will be recorded under the experiment in Azure."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Choose a name for the run history container in the workspace.\n",
        "experiment_name = 'showcasing-dataset'\n",
        "source_directory  = '.'\n",
        "\n",
        "experiment = Experiment(ws, experiment_name)\n",
        "experiment"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Create or Attach an AmlCompute cluster\n",
        "You will need to create a [compute target](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#compute-target) for your AutoML run. In this tutorial, you get the default `AmlCompute` as your training compute resource.\n",
        "\n",
        "> Note that if you have an AzureML Data Scientist role, you will not have permission to create compute resources. Talk to your workspace or IT admin to create the compute targets described in this section, if they do not already exist."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Choose a name for your cluster.\n",
        "amlcompute_cluster_name = \"cpu-cluster\"\n",
        "\n",
        "found = False\n",
        "# Check if this compute target already exists in the workspace.\n",
        "cts = ws.compute_targets\n",
        "if amlcompute_cluster_name in cts and cts[amlcompute_cluster_name].type == 'AmlCompute':\n",
        "    found = True\n",
        "    print('Found existing compute target.')\n",
        "    compute_target = cts[amlcompute_cluster_name]\n",
        "    \n",
        "if not found:\n",
        "    print('Creating a new compute target...')\n",
        "    provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_V2\", # for GPU, use \"Standard_NC6s_v3\"\n",
        "                                                                #vm_priority = 'lowpriority', # optional\n",
        "                                                                max_nodes = 4)\n",
        "\n",
        "    # Create the cluster.\n",
        "    compute_target = ComputeTarget.create(ws, amlcompute_cluster_name, provisioning_config)\n",
        "    \n",
        "    # Can poll for a minimum number of nodes and for a specific timeout.\n",
        "    # If no min_node_count is provided, it will use the scale settings for the cluster.\n",
        "    compute_target.wait_for_completion(show_output = True, timeout_in_minutes = 10)\n",
        "    \n",
        "     # For a more detailed view of current AmlCompute status, use get_status()."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Dataset Configuration\n",
        "\n",
        "The following steps detail how to create a [FileDataset](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.filedataset?view=azure-ml-py) and [TabularDataset](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.tabulardataset?view=azure-ml-py) from an external CSV file, and configure them to be used by a [Pipeline](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline(class)?view=azure-ml-py):\n",
        "\n",
        "1. Create a dataset from a csv file\n",
        "2. Create a [PipelineParameter](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py) object and set the `default_value` to the dataset. [PipelineParameter](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py) objects enabled arguments to be passed into Pipelines when they are resubmitted after creation. The `name` is referenced later on when we submit additional pipeline runs with different input datasets. \n",
        "3. Create a [DatasetConsumptionConfig](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.dataset_consumption_config.datasetconsumptionconfig?view=azure-ml-py) object from the [PiepelineParameter](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py). The [DatasetConsumptionConfig](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.dataset_consumption_config.datasetconsumptionconfig?view=azure-ml-py) object specifies how the dataset should be used by the remote compute where the pipeline is run. **NOTE** only [DatasetConsumptionConfig](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.dataset_consumption_config.datasetconsumptionconfig?view=azure-ml-py) objects built on [FileDataset](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.filedataset?view=azure-ml-py) can be set `as_mount()` or `as_download()` on the remote compute."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "tags": [
          "datapath-remarks-sample"
        ]
      },
      "outputs": [],
      "source": [
        "file_dataset = Dataset.File.from_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv')\n",
        "file_pipeline_param = PipelineParameter(name=\"file_ds_param\", default_value=file_dataset)\n",
        "file_ds_consumption = DatasetConsumptionConfig(\"file_dataset\", file_pipeline_param).as_mount()\n",
        "\n",
        "tabular_dataset = Dataset.Tabular.from_delimited_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv')\n",
        "tabular_pipeline_param = PipelineParameter(name=\"tabular_ds_param\", default_value=tabular_dataset)\n",
        "tabular_ds_consumption = DatasetConsumptionConfig(\"tabular_dataset\", tabular_pipeline_param)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "We will setup a training script to ingest our passed-in datasets and print their contents. **NOTE** the names of the datasets referenced inside the training script correspond to the `name` of their respective [DatasetConsumptionConfig](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.dataset_consumption_config.datasetconsumptionconfig?view=azure-ml-py) objects we defined above."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "%%writefile train_with_dataset.py\n",
        "from azureml.core import Run\n",
        "\n",
        "input_file_ds_path = Run.get_context().input_datasets['file_dataset']\n",
        "with open(input_file_ds_path, 'r') as f:\n",
        "    content = f.read()\n",
        "    print(content)\n",
        "\n",
        "input_tabular_ds = Run.get_context().input_datasets['tabular_dataset']\n",
        "tabular_df = input_tabular_ds.to_pandas_dataframe()\n",
        "print(tabular_df)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "<a id='index1'></a>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Create a Pipeline with a Dataset PipelineParameter\n",
        "\n",
        "Note that the ```file_ds_consumption``` and ```tabular_ds_consumption``` are specified as both arguments and inputs to create a step."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "conda_dep = CondaDependencies()\n",
        "conda_dep.add_pip_package(\"pandas\")\n",
        "\n",
        "run_config = RunConfiguration(conda_dependencies=conda_dep)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "train_step = PythonScriptStep(\n",
        "    name=\"train_step\",\n",
        "    script_name=\"train_with_dataset.py\",\n",
        "    arguments=[\"--param1\", file_ds_consumption, \"--param2\", tabular_ds_consumption],\n",
        "    inputs=[file_ds_consumption, tabular_ds_consumption],\n",
        "    compute_target=compute_target,\n",
        "    source_directory=source_directory,\n",
        "    runconfig=run_config)\n",
        "\n",
        "print(\"train_step created\")\n",
        "\n",
        "pipeline = Pipeline(workspace=ws, steps=[train_step])\n",
        "print(\"pipeline with the train_step created\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "<a id='index2'></a>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Submit a Pipeline with a Dataset PipelineParameter\n",
        "\n",
        "Pipelines can be submitted with default values of PipelineParameters by not specifying any parameters."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Pipeline will run with default file_ds and tabular_ds\n",
        "pipeline_run = experiment.submit(pipeline)\n",
        "print(\"Pipeline is submitted for execution\")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "RunDetails(pipeline_run).show()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_run.wait_for_completion()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "<a id='index3'></a>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Submit a Pipeline with a different Dataset PipelineParameter value from the SDK\n",
        "\n",
        "The training pipeline can be reused with different input datasets by passing them in as PipelineParameters"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "iris_file_ds = Dataset.File.from_files('https://raw.githubusercontent.com/Azure/MachineLearningNotebooks/'\n",
        "                                        '4e7b3784d50e81c313c62bcdf9a330194153d9cd/how-to-use-azureml/work-with-data/'\n",
        "                                        'datasets-tutorial/train-with-datasets/train-dataset/iris.csv')\n",
        "\n",
        "iris_tabular_ds = Dataset.Tabular.from_delimited_files('https://raw.githubusercontent.com/Azure/MachineLearningNotebooks/'\n",
        "                                                       '4e7b3784d50e81c313c62bcdf9a330194153d9cd/how-to-use-azureml/work-with-data/'\n",
        "                                                       'datasets-tutorial/train-with-datasets/train-dataset/iris.csv')"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_run_with_params = experiment.submit(pipeline, pipeline_parameters={'file_ds_param': iris_file_ds, 'tabular_ds_param': iris_tabular_ds}) "
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "RunDetails(pipeline_run_with_params).show()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_run_with_params.wait_for_completion()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "<a id='index4'></a>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Dynamically Set the Dataset PipelineParameter Values using a REST Call\n",
        "\n",
        "Let's publish the pipeline we created previously, so we can generate a pipeline endpoint. We can then submit the iris datasets to the pipeline REST endpoint by passing in their IDs. "
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "published_pipeline = pipeline.publish(name=\"Dataset_Pipeline\", description=\"Pipeline to test Dataset PipelineParameter\", continue_on_step_failure=True)\n",
        "published_pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "published_pipeline.submit(ws, experiment_name=\"publishedexperiment\", pipeline_parameters={'file_ds_param': iris_file_ds, 'tabular_ds_param': iris_tabular_ds})"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.authentication import InteractiveLoginAuthentication\n",
        "import requests\n",
        "\n",
        "auth = InteractiveLoginAuthentication()\n",
        "aad_token = auth.get_authentication_header()\n",
        "\n",
        "rest_endpoint = published_pipeline.endpoint\n",
        "\n",
        "print(\"You can perform HTTP POST on URL {} to trigger this pipeline\".format(rest_endpoint))"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# specify the param when running the pipeline\n",
        "response = requests.post(rest_endpoint, \n",
        "                         headers=aad_token, \n",
        "                         json={\"ExperimentName\": \"MyRestPipeline\",\n",
        "                               \"RunSource\": \"SDK\",\n",
        "                               \"DataSetDefinitionValueAssignments\": {\"file_ds_param\": {\"SavedDataSetReference\": {\"Id\": iris_file_ds.id}},\n",
        "                                                                     \"tabular_ds_param\": {\"SavedDataSetReference\": {\"Id\": iris_tabular_ds.id}}}\n",
        "                              }\n",
        "                        )"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "try:\n",
        "    response.raise_for_status()\n",
        "except Exception:    \n",
        "    raise Exception('Received bad response from the endpoint: {}\\n'\n",
        "                    'Response Code: {}\\n'\n",
        "                    'Headers: {}\\n'\n",
        "                    'Content: {}'.format(rest_endpoint, response.status_code, response.headers, response.content))\n",
        "\n",
        "run_id = response.json().get('Id')\n",
        "print('Submitted pipeline run: ', run_id)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "published_pipeline_run_via_rest = PipelineRun(ws.experiments[\"MyRestPipeline\"], run_id)\n",
        "RunDetails(published_pipeline_run_via_rest).show()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "published_pipeline_run_via_rest.wait_for_completion()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "<a id='index5'></a>"
      ]
    }
  ],
  "metadata": {
    "authors": [
      {
        "name": "rafarmah"
      }
    ],
    "category": "tutorial",
    "compute": [
      "AML Compute"
    ],
    "datasets": [
      "Custom"
    ],
    "deployment": [
      "None"
    ],
    "exclude_from_index": false,
    "framework": [
      "Azure ML"
    ],
    "friendly_name": "How to use Dataset as a PipelineParameter",
    "kernelspec": {
      "display_name": "Python 3.8 - AzureML",
      "language": "python",
      "name": "python38-azureml"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.7"
    },
    "order_index": 13.0,
    "star_tag": [
      "featured"
    ],
    "tags": [
      "None"
    ],
    "task": "Demonstrates the use of Dataset as a PipelineParameter"
  },
  "nbformat": 4,
  "nbformat_minor": 2
}